package com.niit.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkRDD_Transform_01 {

  def main(args: Array[String]): Unit = {
    val sparkConf =  new SparkConf().setMaster("local[*]").setAppName("Transform")
    val sc = new SparkContext(sparkConf)
    //1.转换算子之 map
    //生成1-4的RDD
    val rdd = sc.makeRDD(1 to 4)
    // 1 2 3 4 => 2 4 6 8
      //一个RDD计算后 会生成一个新的RDD
    val mapRDD1:RDD[Int] = rdd.map(_ * 2)
    mapRDD1.collect().foreach(println)
//    rdd.map(x => {
//      println("xxxxxxxxx")
//      x * 2
//    }
//    )

    //2.mapPartitions
    // mapPartitions : 可以以分区为单位进行数据转换操作
   // 但是会将整个分区的数据加载到内存进行引用
    // 如果处理完的数据是不会被释放掉，存在对象的引用。
    // 在内存较小，数据量较大的场合下，容易出现内存溢出。
    val mapRDD2 =  rdd.mapPartitions( iter =>{
      println(">>>>>>>>>>>>>>>>>>>>>")
      iter.map(_*2)
    })

    mapRDD2.collect().foreach(println)

    //案例：获取每个数据分区的最大值 【1，2，3】 【4，5，6】【7，8，9】
    val rdd2 = sc.makeRDD(1 to 9,3)

    val mapRDD3 =  rdd2.mapPartitions(iter => {
          // iter :【1，2，3】   【4，5，6】  【7，8，9】
        List(iter.max).iterator

    })
    mapRDD3.collect().foreach(println)


    //flatMap:RDD的扁平化

   val rdd3 =  sc.makeRDD(List( List(1,2) , List(3,4) ))//==>List(1,2)   List(3,4) ==> List(1,2,3,4)
    //    rdd3.flatMap(_)
    val flatRdd1 = rdd3.flatMap( list =>{
      list
    } )
    flatRdd1.collect().foreach(println)
    //案例1：以空格拆分单词
   val rdd4 = sc.makeRDD(List("Hello Spark" , "Hello Hadoop" ))
   val flatRdd2 = rdd4.flatMap(s=>{
     s.split(" ")
   }
   )

    flatRdd2.collect().foreach(println)

    //glom:将同一个分区的数据直接转换为相同类型的内存数组进行处理，分区不变
    val rdd5 =  sc.makeRDD(List(1,2,3,4),2) //【1，2】  【3，4】
    val glomRDD:RDD[Array[Int]]  = rdd5.glom()//[1,2,3,4]
    glomRDD.collect().foreach(data => println(data.mkString(",")))

    //groupBy
    /*
    将数据根据指定的规则进行分组, 分区默认不变，但是数据会被打乱重新组合，我们将这样
  的操作称之为 shuffle。极限情况下，数据可能被分在同一个分区中一个组的数据在一个分
  区中，但是并不是说一个分区中只有一个组
     */
    val rdd6 =  sc.makeRDD(List(1,2,3,4),2) //【1，2】  【3，4】

    // groupBy会将数据源中的每一个数据进行分组判断，根据返回的分组key进行分组
    // 相同的key值的数据会放置在一个组中
    def groupByFunction(num:Int): Int ={
      num % 2
    }
    val groupRDD:RDD[(Int,Iterable[Int])] =  rdd6.groupBy(groupByFunction)
    groupRDD.collect().foreach(println)

    sc.stop()
  }

}
